Skip to content

Add MapAsync for concurrent collection processing#2408

Open
GarrettBeatty wants to merge 4 commits into
devfrom
gcbeatty/durable-map
Open

Add MapAsync for concurrent collection processing#2408
GarrettBeatty wants to merge 4 commits into
devfrom
gcbeatty/durable-map

Conversation

@GarrettBeatty

@GarrettBeatty GarrettBeatty commented Jun 5, 2026

Copy link
Copy Markdown
Contributor

#2216

What

Adds IDurableContext.MapAsync to Amazon.Lambda.DurableExecution. MapAsync processes a collection in parallel with one child context per item, mirroring the Python/JS/Java SDKs where Map is a sibling of Parallel sharing one concurrency engine. It reuses the IBatchResult<T> family and concurrency/completion machinery introduced by ParallelAsync in #2375.

Public API:

Type Purpose
IDurableContext.MapAsync<TItem, TResult>(IReadOnlyList<TItem> items, Func<IDurableContext, TItem, int, IReadOnlyList<TItem>, CancellationToken, Task<TResult>> func, string? name, MapConfig? config, CancellationToken) Process each item concurrently in its own child context, returning Task<IBatchResult<TResult>>. The per-item func receives the durable context, the item, its zero-based index, the full source list (matching the Python/JS SDKs), and a CancellationToken linking the caller's token with the SDK's workflow-shutdown signal (also tripped cooperatively when a sibling satisfies the CompletionConfig and the map short-circuits).
MapConfig MaxConcurrency (int?, null = unlimited, must be ≥1), CompletionConfig, NestingType, ItemNamer. Defaults CompletionConfig to AllCompleted() (permissive), matching Python/Java Map — intentionally differs from ParallelConfig's AllSuccessful(). NestingType defaults to Nested; NestingType.Flat is not yet supported and throws NotSupportedException when invoked.
MapConfig.ItemNamer Func<object, int, string>? — supplies a per-item branch name (item + zero-based index) surfaced in traces and on IBatchItem<T>.Name. When null, branches are named by index ("0", "1", ...). (No ItemBatcher — not implemented in any reference SDK.)
MapException Thrown when CompletionConfig signals FailureToleranceExceeded, so callers can distinguish Map failures from Parallel failures. Exposes a type-erased IBatchResult? Result (cast to IBatchResult<T> when the item type is known) and the CompletionReason. Base type for map failures; catching it stays forward-compatible.

Behavior: by default (AllCompleted) every item runs regardless of per-item failures, which are captured on the corresponding IBatchItem<T> and surfaced via IBatchResult<T>.Failed rather than thrown. The map throws MapException only when the CompletionConfig criteria are violated. For fail-fast semantics, set CompletionConfig to AllSuccessful() or call IBatchResult<T>.ThrowIfError() on the result.

Implementation notes:

  • Extracts a ConcurrentOperation<T> base holding all orchestration, completion, checkpoint, and replay logic. ParallelOperation and MapOperation are thin subclasses supplying only the per-unit (name, func), sub-type labels, and failure-exception factory. Adds Map / MapItem to OperationSubTypes.
  • Generalizes ParallelSummary / ParallelJsonContext into shared BatchSummary / BatchJsonContext.

Per-item checkpoint payloads are serialized via the ILambdaSerializer registered on ILambdaContext.Serializer — the same pattern as ParallelAsync / StepAsync / RunInChildContextAsync. The AOT story is determined entirely by which serializer the user registers with the runtime (e.g., SourceGeneratorLambdaJsonSerializer<TContext>). MapConfig does not expose a serializer slot.

Testing

24 new unit tests in MapOperationTests.cs, mirroring the Parallel set:

  • CompletionConfig matrix: AllSuccessful, AllCompleted, FirstSuccessful, MinSuccessful, ToleratedFailureCount, ToleratedFailurePercentage — both pass and fail thresholds.
  • Concurrency: MaxConcurrency enforced; unbounded when null.
  • Replay determinism: mixed-status replay, named vs. unnamed items via ItemNamer.
  • IBatchResult<T> accessors and GetResults / GetErrors / ThrowIfError semantics.

6 new integration functions/tests mirroring the Parallel set (require AWS credentials to run): MapHappyPath, MapPartialFailure, MapFailureTolerance, MapFirstSuccessful, MapMaxConcurrency, MapReplayDeterminism.

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.


COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

COPY bin/publish/ ${LAMBDA_TASK_ROOT}

ENTRYPOINT ["/var/task/bootstrap"]

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR adds a new concurrent collection-processing primitive, IDurableContext.MapAsync, to the Amazon.Lambda.DurableExecution SDK. It mirrors the existing ParallelAsync concurrency engine while introducing Map-specific defaults (notably a permissive AllCompleted() completion policy) and Map-specific exception semantics.

Changes:

  • Introduces MapAsync + MapConfig + MapException, and a new MapOperation built on shared concurrency orchestration.
  • Refactors the ParallelAsync implementation by extracting shared logic into ConcurrentOperation<T> and generalizing parent checkpoint summaries to BatchSummary.
  • Adds unit and integration tests plus integration test Lambda function projects for Map scenarios (happy path, partial failure, max concurrency, first-successful, failure tolerance, replay determinism).

Reviewed changes

Copilot reviewed 41 out of 41 changed files in this pull request and generated 3 comments.

Show a summary per file
File Description
MAP-IMPLEMENTATION-PLAN.md Design/implementation plan for MapAsync and related refactors.
Libraries/test/Amazon.Lambda.DurableExecution.Tests/ParallelOperationTests.cs Updates replay JSON payload shape from Branches to Units for parallel tests.
Libraries/test/Amazon.Lambda.DurableExecution.Tests/MapOperationTests.cs Adds comprehensive unit tests for MapAsync behavior, replay, naming, and completion policies.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/MapReplayDeterminismFunction.csproj New integration test function project for map replay determinism.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Function.cs Workflow that forces suspend/resume to validate per-item replay determinism.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapReplayDeterminismFunction/Dockerfile Container packaging for the replay determinism test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/MapPartialFailureFunction.csproj New integration test function project for permissive-default partial failure.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Function.cs Workflow validating Map default AllCompleted() preserves partial failures without failing workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapPartialFailureFunction/Dockerfile Container packaging for the partial failure test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/MapMaxConcurrencyFunction.csproj New integration test function project for MaxConcurrency.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Function.cs Workflow validating dispatch throttling via durable waits + timestamps.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapMaxConcurrencyFunction/Dockerfile Container packaging for the max concurrency test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/MapHappyPathFunction.csproj New integration test function project for map happy path.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Function.cs Workflow validating step-per-item processing and ItemNamer visibility.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapHappyPathFunction/Dockerfile Container packaging for the happy path test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/MapFirstSuccessfulFunction.csproj New integration test function project for first-successful short-circuit.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Function.cs Workflow validating FirstSuccessful() and started/unfinished item reporting.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFirstSuccessfulFunction/Dockerfile Container packaging for the first-successful test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/MapFailureToleranceFunction.csproj New integration test function project for failure tolerance exceeded.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Function.cs Workflow validating failure tolerance triggers MapException and fails workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/TestFunctions/MapFailureToleranceFunction/Dockerfile Container packaging for the failure tolerance test function.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapReplayDeterminismTest.cs Integration test asserting deterministic item operation IDs and replayed step results.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapPartialFailureTest.cs Integration test asserting permissive default allows partial failure with SUCCEEDED workflow.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapMaxConcurrencyTest.cs Integration test asserting MaxConcurrency throttles dispatch waves.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapHappyPathTest.cs Integration test validating end-to-end happy path and history events/names.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFirstSuccessfulTest.cs Integration test validating first-successful short-circuit behavior.
Libraries/test/Amazon.Lambda.DurableExecution.IntegrationTests/MapFailureToleranceTest.cs Integration test validating failure tolerance triggers FAILED workflow + MapException indication.
Libraries/src/Amazon.Lambda.DurableExecution/Operation.cs Adds OperationSubTypes.Map and OperationSubTypes.MapItem.
Libraries/src/Amazon.Lambda.DurableExecution/MapConfig.cs Adds Map configuration object with permissive default completion config and ItemNamer.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelSummary.cs Removes the Parallel-specific summary type (replaced by shared BatchSummary).
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelOperation.cs Refactors ParallelOperation to a thin subclass of ConcurrentOperation.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ParallelJsonContext.cs Removes Parallel-specific JSON context (replaced by BatchJsonContext).
Libraries/src/Amazon.Lambda.DurableExecution/Internal/MapOperation.cs Adds MapOperation as a thin subclass of ConcurrentOperation.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/ConcurrentOperation.cs Adds extracted shared orchestration/replay/checkpoint engine for Parallel + Map.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchSummary.cs Adds shared parent checkpoint payload type for concurrent ops.
Libraries/src/Amazon.Lambda.DurableExecution/Internal/BatchJsonContext.cs Adds shared source-gen JSON context for BatchSummary payloads.
Libraries/src/Amazon.Lambda.DurableExecution/IDurableContext.cs Adds public MapAsync<TItem, TResult> API with XML docs.
Libraries/src/Amazon.Lambda.DurableExecution/DurableExecutionException.cs Adds MapException type parallel to ParallelException.
Libraries/src/Amazon.Lambda.DurableExecution/DurableContext.cs Wires MapAsync into the durable context runtime implementation.
Docs/durable-execution-design.md Updates design docs: Map default completion behavior and removes ItemBatcher references.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +14 to +21
internal sealed class BatchSummary
{
[JsonPropertyName("CompletionReason")]
public string? CompletionReason { get; set; }

[JsonPropertyName("Units")]
public IList<BatchUnitSummary> Units { get; set; } = new List<BatchUnitSummary>();
}

@GarrettBeatty GarrettBeatty Jun 5, 2026

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is fine - since we are in preview

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Copilot reviewed 41 out of 41 changed files in this pull request and generated 3 comments.

@GarrettBeatty GarrettBeatty force-pushed the gcbeatty/durable-parallel branch 3 times, most recently from c88ad8a to d347067 Compare June 12, 2026 15:24
@GarrettBeatty GarrettBeatty force-pushed the gcbeatty/durable-map branch from e0b6f5f to dcda8da Compare June 12, 2026 18:54
Base automatically changed from gcbeatty/durable-parallel to dev June 15, 2026 17:47
@GarrettBeatty GarrettBeatty force-pushed the gcbeatty/durable-map branch from dcda8da to fb4572a Compare June 16, 2026 15:32
using SdkErrorObject = Amazon.Lambda.Model.ErrorObject;
using SdkOperationUpdate = Amazon.Lambda.Model.OperationUpdate;

namespace Amazon.Lambda.DurableExecution.Internal;

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all the common logic (mostly everything) shared between map and parallel was moved to ConcurrentOperation

@GarrettBeatty GarrettBeatty force-pushed the gcbeatty/durable-map branch from fb4572a to 02c23f5 Compare June 16, 2026 16:26
…CCEED

The parent context for both ParallelAsync and MapAsync now always
checkpoints as Action=SUCCEED with the summary (including
CompletionReason) in the Payload field, matching the wire format used
by the Python, JS, and Java SDKs.

Previously the C# SDK emitted Action=FAIL for FailureToleranceExceeded,
which left ContextDetails.Result empty (Payload is forbidden on FAIL
updates) and made it impossible for replay to reconstruct a caught
ParallelException/MapException deterministically. The exception is now
thrown SDK-side after the checkpoint based on the CompletionReason in
the reconstructed BatchResult.
@GarrettBeatty GarrettBeatty marked this pull request as ready for review June 16, 2026 17:46
@GarrettBeatty GarrettBeatty requested review from a team as code owners June 16, 2026 17:46
@GarrettBeatty GarrettBeatty requested review from normj and philasmar June 16, 2026 17:46

@normj normj left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Approved with one nit

private async Task CheckpointParentResultAsync(
BatchResult<T> result,
CompletionReason completionReason,
DurableExecutionException? failureException,

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The failureException parameter is not used in this private method. Should it be removed or is it a miss?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants